// Copyright Epic Games, Inc. All Rights Reserved.

#include "httpasio.h"
#include "httptracer.h"

#include <zencore/except.h>
#include <zencore/logging.h>
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>

#include "httpparser.h"

#include <deque>
#include <memory>
#include <string_view>
#include <vector>

ZEN_THIRD_PARTY_INCLUDES_START
#if ZEN_PLATFORM_WINDOWS
#	include <conio.h>
#	include <mstcpip.h>
#endif
#include <asio.hpp>
ZEN_THIRD_PARTY_INCLUDES_END

#define ASIO_VERBOSE_TRACE 0

#if ASIO_VERBOSE_TRACE
#	define ZEN_TRACE_VERBOSE ZEN_TRACE
#else
#	define ZEN_TRACE_VERBOSE(fmtstr, ...)
#endif

namespace zen::asio_http {

using namespace std::literals;

struct HttpAcceptor;
struct HttpResponse;
struct HttpServerConnection;

inline LoggerRef
InitLogger()
{
	LoggerRef Logger = logging::Get("asio");
	// Logger.set_level(spdlog::level::trace);
	return Logger;
}

inline LoggerRef
Log()
{
	static LoggerRef g_Logger = InitLogger();
	return g_Logger;
}

//////////////////////////////////////////////////////////////////////////

struct HttpAsioServerImpl
{
public:
	HttpAsioServerImpl();
	~HttpAsioServerImpl();

	void		 Initialize(std::filesystem::path DataDir);
	int			 Start(uint16_t Port, bool ForceLooopback, int ThreadCount);
	void		 Stop();
	void		 RegisterService(const char* UrlPath, HttpService& Service);
	HttpService* RouteRequest(std::string_view Url);

	asio::io_service						 m_IoService;
	asio::io_service::work					 m_Work{m_IoService};
	std::unique_ptr<asio_http::HttpAcceptor> m_Acceptor;
	std::vector<std::thread>				 m_ThreadPool;

	LoggerRef		 m_RequestLog;
	HttpServerTracer m_RequestTracer;

	struct ServiceEntry
	{
		std::string	 ServiceUrlPath;
		HttpService* Service;
	};

	RwLock					  m_Lock;
	std::vector<ServiceEntry> m_UriHandlers;
};

/**
 * This is the class which request handlers use to interact with the server instance
 */

class HttpAsioServerRequest : public HttpServerRequest
{
public:
	HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer);
	~HttpAsioServerRequest();

	virtual Oid		 ParseSessionId() const override;
	virtual uint32_t ParseRequestId() const override;

	virtual IoBuffer ReadPayload() override;
	virtual void	 WriteResponse(HttpResponseCode ResponseCode) override;
	virtual void	 WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override;
	virtual void	 WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override;
	virtual void	 WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override;
	virtual bool	 TryGetRanges(HttpRanges& Ranges) override;

	using HttpServerRequest::WriteResponse;

	HttpAsioServerRequest(const HttpAsioServerRequest&) = delete;
	HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete;

	HttpRequestParser&			  m_Request;
	IoBuffer					  m_PayloadBuffer;
	std::unique_ptr<HttpResponse> m_Response;
};

struct HttpResponse
{
public:
	HttpResponse() = default;
	explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {}

	void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList)
	{
		ZEN_TRACE_CPU("asio::InitializeForPayload");

		m_ResponseCode			  = ResponseCode;
		const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size());

		m_DataBuffers.reserve(ChunkCount);

		for (IoBuffer& Buffer : BlobList)
		{
#if 1
			m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned();
#else
			IoBuffer TempBuffer = std::move(Buffer);
			TempBuffer.MakeOwned();
			m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer));
#endif
		}

		uint64_t LocalDataSize = 0;

		m_AsioBuffers.push_back({});  // Placeholder for header

		for (IoBuffer& Buffer : m_DataBuffers)
		{
			uint64_t BufferDataSize = Buffer.Size();

			ZEN_ASSERT(BufferDataSize);

			LocalDataSize += BufferDataSize;

			IoBufferFileReference FileRef;
			if (Buffer.GetFileReference(/* out */ FileRef))
			{
				// TODO: Use direct file transfer, via TransmitFile/sendfile
				//
				// this looks like it requires some custom asio plumbing however

				m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()});
			}
			else
			{
				// Send from memory

				m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()});
			}
		}
		m_ContentLength = LocalDataSize;

		auto Headers	 = GetHeaders();
		m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size());
	}

	uint16_t ResponseCode() const { return m_ResponseCode; }
	uint64_t ContentLength() const { return m_ContentLength; }

	const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; }

	std::string_view GetHeaders()
	{
		m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n"
				  << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n"
				  << "Content-Length: " << ContentLength() << "\r\n"sv;

		if (!m_IsKeepAlive)
		{
			m_Headers << "Connection: close\r\n"sv;
		}

		m_Headers << "\r\n"sv;

		return m_Headers;
	}

	void SuppressPayload() { m_AsioBuffers.resize(1); }

private:
	uint16_t						m_ResponseCode	= 0;
	bool							m_IsKeepAlive	= true;
	HttpContentType					m_ContentType	= HttpContentType::kBinary;
	uint64_t						m_ContentLength = 0;
	std::vector<IoBuffer>			m_DataBuffers;
	std::vector<asio::const_buffer> m_AsioBuffers;
	ExtendableStringBuilder<160>	m_Headers;
};

//////////////////////////////////////////////////////////////////////////

struct HttpServerConnection : public HttpRequestParserCallbacks, std::enable_shared_from_this<HttpServerConnection>
{
	HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket);
	~HttpServerConnection();

	std::shared_ptr<HttpServerConnection> AsSharedPtr() { return shared_from_this(); }

	// HttpConnectionBase implementation

	virtual void TerminateConnection() override;
	virtual void HandleRequest() override;

	void HandleNewRequest();

private:
	enum class RequestState
	{
		kInitialState,
		kInitialRead,
		kReadingMore,
		kWriting,
		kWritingFinal,
		kDone,
		kTerminated
	};

	RequestState	  m_RequestState = RequestState::kInitialState;
	HttpRequestParser m_RequestData{*this};

	void EnqueueRead();
	void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount);
	void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false);
	void CloseConnection();

	HttpAsioServerImpl&					   m_Server;
	asio::streambuf						   m_RequestBuffer;
	std::unique_ptr<asio::ip::tcp::socket> m_Socket;
	std::atomic<uint32_t>				   m_RequestCounter{0};
	uint32_t							   m_ConnectionId = 0;
	Ref<IHttpPackageHandler>			   m_PackageHandler;

	RwLock									  m_ResponsesLock;
	std::deque<std::unique_ptr<HttpResponse>> m_Responses;
};

std::atomic<uint32_t> g_ConnectionIdCounter{0};

HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket)
: m_Server(Server)
, m_Socket(std::move(Socket))
, m_ConnectionId(g_ConnectionIdCounter.fetch_add(1))
{
	ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId);
}

HttpServerConnection::~HttpServerConnection()
{
	ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId);
}

void
HttpServerConnection::HandleNewRequest()
{
	EnqueueRead();
}

void
HttpServerConnection::TerminateConnection()
{
	if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated)
	{
		return;
	}

	m_RequestState = RequestState::kTerminated;
	ZEN_ASSERT(m_Socket);

	// Terminating, we don't care about any errors when closing socket
	std::error_code Ec;
	m_Socket->shutdown(asio::socket_base::shutdown_both, Ec);
	m_Socket->close(Ec);
}

void
HttpServerConnection::EnqueueRead()
{
	if ((m_RequestState == RequestState::kInitialRead) || (m_RequestState == RequestState::kReadingMore))
	{
		m_RequestState = RequestState::kReadingMore;
	}
	else
	{
		m_RequestState = RequestState::kInitialRead;
	}

	m_RequestBuffer.prepare(64 * 1024);

	asio::async_read(*m_Socket.get(),
					 m_RequestBuffer,
					 asio::transfer_at_least(1),
					 [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); });
}

void
HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount)
{
	if (Ec)
	{
		switch (m_RequestState)
		{
			case RequestState::kDone:
			case RequestState::kInitialRead:
			case RequestState::kTerminated:
				ZEN_TRACE_VERBOSE("on data received ERROR (EXPECTED), connection: {}, reason: '{}'", m_ConnectionId, Ec.message());
				return;

			default:
				ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message());
				return TerminateConnection();
		}
	}

	ZEN_TRACE_VERBOSE("on data received, connection: {}, request: {}, thread: {}, bytes: {}",
					  m_ConnectionId,
					  m_RequestCounter.load(std::memory_order_relaxed),
					  zen::GetCurrentThreadId(),
					  NiceBytes(ByteCount));

	while (m_RequestBuffer.size())
	{
		const asio::const_buffer& InputBuffer = m_RequestBuffer.data();

		size_t Result = m_RequestData.ConsumeData((const char*)InputBuffer.data(), InputBuffer.size());
		if (Result == ~0ull)
		{
			return TerminateConnection();
		}

		m_RequestBuffer.consume(Result);
	}

	switch (m_RequestState)
	{
		case RequestState::kDone:
		case RequestState::kWritingFinal:
		case RequestState::kTerminated:
			break;

		default:
			EnqueueRead();
			break;
	}
}

void
HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop)
{
	if (Ec)
	{
		ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message());
		TerminateConnection();
	}
	else
	{
		ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}",
						  m_ConnectionId,
						  m_RequestCounter.load(std::memory_order_relaxed),
						  zen::GetCurrentThreadId(),
						  NiceBytes(ByteCount));

		if (!m_RequestData.IsKeepAlive())
		{
			CloseConnection();
		}
		else
		{
			if (Pop)
			{
				RwLock::ExclusiveLockScope _(m_ResponsesLock);
				m_Responses.pop_front();
			}

			m_RequestCounter.fetch_add(1);
		}
	}
}

void
HttpServerConnection::CloseConnection()
{
	if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated)
	{
		return;
	}
	ZEN_ASSERT(m_Socket);
	m_RequestState = RequestState::kDone;

	std::error_code Ec;
	m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec);
	if (Ec)
	{
		ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message());
	}
	m_Socket->close(Ec);
	if (Ec)
	{
		ZEN_WARN("socket close ERROR, reason '{}'", Ec.message());
	}
}

void
HttpServerConnection::HandleRequest()
{
	if (!m_RequestData.IsKeepAlive())
	{
		m_RequestState = RequestState::kWritingFinal;

		std::error_code Ec;
		m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec);

		if (Ec)
		{
			ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message());
		}
	}
	else
	{
		m_RequestState = RequestState::kWriting;
	}

	if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url()))
	{
		ZEN_TRACE_CPU("asio::HandleRequest");

		const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed);

		HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body());

		ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber);

		const HttpVerb		   RequestVerb = Request.RequestVerb();
		const std::string_view Uri		   = Request.RelativeUri();

		if (m_Server.m_RequestLog.ShouldLog(logging::level::Trace))
		{
			ZEN_LOG_TRACE(m_Server.m_RequestLog,
						  "connection #{} Handling Request: {} {} ({} bytes ({}), accept: {})",
						  m_ConnectionId,
						  ToString(RequestVerb),
						  Uri,
						  Request.ContentLength(),
						  ToString(Request.RequestContentType()),
						  ToString(Request.AcceptContentType()));

			m_Server.m_RequestTracer.WriteDebugPayload(fmt::format("request_{}_{}.bin", m_ConnectionId, RequestNumber),
													   std::vector<IoBuffer>{Request.ReadPayload()});
		}

		if (!HandlePackageOffers(*Service, Request, m_PackageHandler))
		{
			try
			{
				Service->HandleRequest(Request);
			}
			catch (const AssertException& AssertEx)
			{
				// Drop any partially formatted response
				Request.m_Response.reset();

				ZEN_ERROR("Caught assert exception while handling request: {}", AssertEx.FullDescription());
				Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, AssertEx.FullDescription());
			}
			catch (const std::system_error& SystemError)
			{
				// Drop any partially formatted response
				Request.m_Response.reset();

				if (IsOOM(SystemError.code()) || IsOOD(SystemError.code()))
				{
					Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, SystemError.what());
				}
				else
				{
					ZEN_ERROR("Caught system error exception while handling request: {}", SystemError.what());
					Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what());
				}
			}
			catch (const std::bad_alloc& BadAlloc)
			{
				// Drop any partially formatted response
				Request.m_Response.reset();

				Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, BadAlloc.what());
			}
			catch (const std::exception& ex)
			{
				// Drop any partially formatted response
				Request.m_Response.reset();

				ZEN_ERROR("Caught exception while handling request: {}", ex.what());
				Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, ex.what());
			}
		}

		if (std::unique_ptr<HttpResponse> Response = std::move(Request.m_Response))
		{
			// Transmit the response

			if (m_RequestData.RequestVerb() == HttpVerb::kHead)
			{
				Response->SuppressPayload();
			}

			auto ResponseBuffers = Response->AsioBuffers();

			uint64_t ResponseLength = 0;

			for (auto& Buffer : ResponseBuffers)
			{
				ResponseLength += Buffer.size();
			}

			{
				RwLock::ExclusiveLockScope _(m_ResponsesLock);
				m_Responses.push_back(std::move(Response));
			}

			// TODO: should cork/uncork for Linux?

			{
				ZEN_TRACE_CPU("asio::async_write");
				asio::async_write(*m_Socket.get(),
								  ResponseBuffers,
								  asio::transfer_exactly(ResponseLength),
								  [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) {
									  Conn->OnResponseDataSent(Ec, ByteCount, true);
								  });
			}
			return;
		}
	}

	if (m_RequestData.RequestVerb() == HttpVerb::kHead)
	{
		std::string_view Response =
			"HTTP/1.1 404 NOT FOUND\r\n"
			"\r\n"sv;

		if (!m_RequestData.IsKeepAlive())
		{
			Response =
				"HTTP/1.1 404 NOT FOUND\r\n"
				"Connection: close\r\n"
				"\r\n"sv;
		}

		asio::async_write(
			*m_Socket.get(),
			asio::buffer(Response),
			[Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); });
	}
	else
	{
		std::string_view Response =
			"HTTP/1.1 404 NOT FOUND\r\n"
			"Content-Length: 23\r\n"
			"Content-Type: text/plain\r\n"
			"\r\n"
			"No suitable route found"sv;

		if (!m_RequestData.IsKeepAlive())
		{
			Response =
				"HTTP/1.1 404 NOT FOUND\r\n"
				"Content-Length: 23\r\n"
				"Content-Type: text/plain\r\n"
				"Connection: close\r\n"
				"\r\n"
				"No suitable route found"sv;
		}

		asio::async_write(
			*m_Socket.get(),
			asio::buffer(Response),
			[Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); });
	}
}

//////////////////////////////////////////////////////////////////////////

struct HttpAcceptor
{
	HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t BasePort, bool ForceLoopback)
	: m_Server(Server)
	, m_IoService(IoService)
	, m_Acceptor(m_IoService, asio::ip::tcp::v6())
	, m_AlternateProtocolAcceptor(m_IoService, asio::ip::tcp::v4())
	{
		m_Acceptor.set_option(asio::ip::v6_only(false));
#if ZEN_PLATFORM_WINDOWS
		// Special option for Windows settings as !asio::socket_base::reuse_address is not the same as exclusive access on Windows platforms
		typedef asio::detail::socket_option::boolean<ASIO_OS_DEF(SOL_SOCKET), SO_EXCLUSIVEADDRUSE> exclusive_address;
		m_Acceptor.set_option(exclusive_address(true));
		m_AlternateProtocolAcceptor.set_option(exclusive_address(true));
#else	// ZEN_PLATFORM_WINDOWS
		m_Acceptor.set_option(asio::socket_base::reuse_address(false));
		m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(false));
#endif	// ZEN_PLATFORM_WINDOWS

		m_Acceptor.set_option(asio::ip::tcp::no_delay(true));
		m_Acceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024));
		m_Acceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024));

		m_AlternateProtocolAcceptor.set_option(asio::ip::tcp::no_delay(true));
		m_AlternateProtocolAcceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024));
		m_AlternateProtocolAcceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024));

		asio::ip::address_v6 BindAddress   = ForceLoopback ? asio::ip::address_v6::loopback() : asio::ip::address_v6::any();
		uint16_t			 EffectivePort = BasePort;

		if (BindAddress.is_loopback())
		{
			m_Acceptor.set_option(asio::ip::v6_only(true));
		}

		asio::error_code BindErrorCode;
		m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode);
		if (BindErrorCode == asio::error::access_denied && !BindAddress.is_loopback())
		{
			// Access denied for a public port - lets try fall back to local port only
			BindAddress = asio::ip::address_v6::loopback();
			m_Acceptor.set_option(asio::ip::v6_only(true));
			m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode);
		}
		if (BindErrorCode == asio::error::address_in_use)
		{
			// Do a retry after a short sleep on same port just to be sure
			ZEN_INFO("Desired port %d is in use, retrying", BasePort);
			Sleep(100);
			m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode);
		}
		// Sharing violation implies the port is being used by another process
		for (uint16_t PortOffset = 1; (BindErrorCode == asio::error::address_in_use) && (PortOffset < 10); ++PortOffset)
		{
			EffectivePort = BasePort + (PortOffset * 100);
			m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode);
		}
		if (BindErrorCode == asio::error::access_denied)
		{
			EffectivePort = 0;
			m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode);
		}
		if (BindErrorCode)
		{
			ZEN_ERROR("Unable open asio service, error '{}'", BindErrorCode.message());
		}
		else if (BindAddress.is_loopback())
		{
			m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), BindErrorCode);
			m_UseAlternateProtocolAcceptor = true;
			ZEN_INFO("Registered local-only handler 'http://{}:{}/' - this is not accessible from remote hosts",
					 "localhost",
					 EffectivePort);
		}

#if ZEN_PLATFORM_WINDOWS
		// On Windows, loopback connections can take advantage of a faster code path optionally with this flag.
		// This must be used by both the client and server side, and is only effective in the absence of
		// Windows Filtering Platform (WFP) callouts which can be installed by security software.
		// https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path
		SOCKET NativeSocket				   = m_Acceptor.native_handle();
		int	   LoopbackOptionValue		   = 1;
		DWORD  OptionNumberOfBytesReturned = 0;
		WSAIoctl(NativeSocket,
				 SIO_LOOPBACK_FAST_PATH,
				 &LoopbackOptionValue,
				 sizeof(LoopbackOptionValue),
				 NULL,
				 0,
				 &OptionNumberOfBytesReturned,
				 0,
				 0);
		if (m_UseAlternateProtocolAcceptor)
		{
			NativeSocket = m_AlternateProtocolAcceptor.native_handle();
			WSAIoctl(NativeSocket,
					 SIO_LOOPBACK_FAST_PATH,
					 &LoopbackOptionValue,
					 sizeof(LoopbackOptionValue),
					 NULL,
					 0,
					 &OptionNumberOfBytesReturned,
					 0,
					 0);
		}
#endif
		m_Acceptor.listen();
		if (m_UseAlternateProtocolAcceptor)
		{
			m_AlternateProtocolAcceptor.listen();
		}

		ZEN_INFO("Started asio server at 'http://{}:{}'", BindAddress.is_loopback() ? "[::1]" : "*", EffectivePort);
	}

	~HttpAcceptor()
	{
		m_Acceptor.close();
		if (m_UseAlternateProtocolAcceptor)
		{
			m_AlternateProtocolAcceptor.close();
		}
	}

	void Start()
	{
		ZEN_ASSERT(!m_IsStopped);
		InitAcceptInternal(m_Acceptor);
		if (m_UseAlternateProtocolAcceptor)
		{
			InitAcceptInternal(m_AlternateProtocolAcceptor);
		}
	}

	void StopAccepting() { m_IsStopped = true; }

	int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); }

private:
	void InitAcceptInternal(asio::ip::tcp::acceptor& Acceptor)
	{
		auto				   SocketPtr = std::make_unique<asio::ip::tcp::socket>(m_IoService);
		asio::ip::tcp::socket& SocketRef = *SocketPtr.get();

		Acceptor.async_accept(SocketRef, [this, &Acceptor, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable {
			if (Ec)
			{
				ZEN_WARN("asio async_accept, connection failed to '{}:{}' reason '{}'",
						 Acceptor.local_endpoint().address().to_string(),
						 Acceptor.local_endpoint().port(),
						 Ec.message());
			}
			else
			{
				// New connection established, pass socket ownership into connection object
				// and initiate request handling loop. The connection lifetime is
				// managed by the async read/write loop by passing the shared
				// reference to the callbacks.

				Socket->set_option(asio::ip::tcp::no_delay(true));
				Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024));
				Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024));

				auto Conn = std::make_shared<HttpServerConnection>(m_Server, std::move(Socket));
				Conn->HandleNewRequest();
			}

			if (!m_IsStopped.load())
			{
				InitAcceptInternal(Acceptor);
			}
			else
			{
				std::error_code CloseEc;
				Acceptor.close(CloseEc);
				if (CloseEc)
				{
					ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message());
				}
			}
		});
	}

	HttpAsioServerImpl&		m_Server;
	asio::io_service&		m_IoService;
	asio::ip::tcp::acceptor m_Acceptor;
	asio::ip::tcp::acceptor m_AlternateProtocolAcceptor;
	bool					m_UseAlternateProtocolAcceptor{false};
	std::atomic<bool>		m_IsStopped{false};
};

//////////////////////////////////////////////////////////////////////////

HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer)
: m_Request(Request)
, m_PayloadBuffer(std::move(PayloadBuffer))
{
	const int PrefixLength = Service.UriPrefixLength();

	std::string_view Uri = Request.Url();
	Uri.remove_prefix(std::min(PrefixLength, static_cast<int>(Uri.size())));
	m_Uri			   = Uri;
	m_UriWithExtension = Uri;
	m_QueryString	   = Request.QueryString();

	m_Verb			= Request.RequestVerb();
	m_ContentLength = Request.Body().Size();
	m_ContentType	= Request.ContentType();

	HttpContentType AcceptContentType = HttpContentType::kUnknownContentType;

	// Parse any extension, to allow requesting a particular response encoding via the URL

	{
		std::string_view UriSuffix8{m_Uri};

		const size_t LastComponentIndex = UriSuffix8.find_last_of('/');

		if (LastComponentIndex != std::string_view::npos)
		{
			UriSuffix8.remove_prefix(LastComponentIndex);
		}

		const size_t LastDotIndex = UriSuffix8.find_last_of('.');

		if (LastDotIndex != std::string_view::npos)
		{
			UriSuffix8.remove_prefix(LastDotIndex + 1);

			AcceptContentType = ParseContentType(UriSuffix8);

			if (AcceptContentType != HttpContentType::kUnknownContentType)
			{
				m_Uri.remove_suffix(uint32_t(UriSuffix8.size() + 1));
			}
		}
	}

	// It an explicit content type extension was specified then we'll use that over any
	// Accept: header value that may be present

	if (AcceptContentType != HttpContentType::kUnknownContentType)
	{
		m_AcceptType = AcceptContentType;
	}
	else
	{
		m_AcceptType = Request.AcceptType();
	}
}

HttpAsioServerRequest::~HttpAsioServerRequest()
{
}

Oid
HttpAsioServerRequest::ParseSessionId() const
{
	return m_Request.SessionId();
}

uint32_t
HttpAsioServerRequest::ParseRequestId() const
{
	return m_Request.RequestId();
}

IoBuffer
HttpAsioServerRequest::ReadPayload()
{
	return m_PayloadBuffer;
}

void
HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode)
{
	ZEN_ASSERT(!m_Response);

	m_Response.reset(new HttpResponse(HttpContentType::kBinary));
	std::array<IoBuffer, 0> Empty;

	m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty);
}

void
HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs)
{
	ZEN_ASSERT(!m_Response);

	m_Response.reset(new HttpResponse(ContentType));
	m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs);
}

void
HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString)
{
	ZEN_ASSERT(!m_Response);
	m_Response.reset(new HttpResponse(ContentType));

	IoBuffer				MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size());
	std::array<IoBuffer, 1> SingleBufferList({MessageBuffer});

	m_Response->InitializeForPayload((uint16_t)ResponseCode, SingleBufferList);
}

void
HttpAsioServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler)
{
	ZEN_ASSERT(!m_Response);

	// Not one bit async, innit
	ContinuationHandler(*this);
}

bool
HttpAsioServerRequest::TryGetRanges(HttpRanges& Ranges)
{
	return TryParseHttpRangeHeader(m_Request.RangeHeader(), Ranges);
}

//////////////////////////////////////////////////////////////////////////

HttpAsioServerImpl::HttpAsioServerImpl() : m_RequestLog(logging::Get("http_requests"))
{
}

HttpAsioServerImpl::~HttpAsioServerImpl()
{
}

void
HttpAsioServerImpl::Initialize(std::filesystem::path DataDir)
{
	m_RequestTracer.Initialize(DataDir);
}

int
HttpAsioServerImpl::Start(uint16_t Port, bool ForceLooopback, int ThreadCount)
{
	ZEN_ASSERT(ThreadCount > 0);

	ZEN_INFO("starting asio http with {} service threads", ThreadCount);

	m_Acceptor.reset(new asio_http::HttpAcceptor(*this, m_IoService, Port, ForceLooopback));
	m_Acceptor->Start();

	// This should consist of a set of minimum threads and grow on demand to
	// meet concurrency needs? Right now we end up allocating a large number
	// of threads even if we never end up using all of them, which seems
	// wasteful. It's also not clear how the demand for concurrency should
	// be balanced with the engine side - ideally we'd have some kind of
	// global scheduling to prevent one side from preventing the other side
	// from making progress. Or at the very least, thread priorities should
	// be considered.

	for (int i = 0; i < ThreadCount; ++i)
	{
		m_ThreadPool.emplace_back([this, Index = i + 1] {
			SetCurrentThreadName(fmt::format("asio_io_{}", Index));

			try
			{
				m_IoService.run();
			}
			catch (const AssertException& AssertEx)
			{
				ZEN_ERROR("Assert caught in asio event loop: {}", AssertEx.FullDescription());
			}
			catch (const std::exception& e)
			{
				ZEN_ERROR("Exception caught in asio event loop: '{}'", e.what());
			}
		});
	}

	ZEN_INFO("asio http started (port {})", m_Acceptor->GetAcceptPort());

	return m_Acceptor->GetAcceptPort();
}

void
HttpAsioServerImpl::Stop()
{
	if (m_Acceptor)
	{
		m_Acceptor->StopAccepting();
	}
	m_IoService.stop();
	for (auto& Thread : m_ThreadPool)
	{
		if (Thread.joinable())
		{
			Thread.join();
		}
	}
	m_ThreadPool.clear();
	m_Acceptor.reset();
}

void
HttpAsioServerImpl::RegisterService(const char* InUrlPath, HttpService& Service)
{
	std::string_view UrlPath(InUrlPath);
	Service.SetUriPrefixLength(UrlPath.size());
	if (!UrlPath.empty() && UrlPath.back() == '/')
	{
		UrlPath.remove_suffix(1);
	}

	RwLock::ExclusiveLockScope _(m_Lock);
	m_UriHandlers.push_back({std::string(UrlPath), &Service});
}

HttpService*
HttpAsioServerImpl::RouteRequest(std::string_view Url)
{
	RwLock::SharedLockScope _(m_Lock);

	HttpService*		   CandidateService	  = nullptr;
	std::string::size_type CandidateMatchSize = 0;
	for (const ServiceEntry& SvcEntry : m_UriHandlers)
	{
		const std::string&			 SvcUrl		= SvcEntry.ServiceUrlPath;
		const std::string::size_type SvcUrlSize = SvcUrl.size();
		if ((SvcUrlSize >= CandidateMatchSize) && Url.compare(0, SvcUrlSize, SvcUrl) == 0 &&
			((SvcUrlSize == Url.size()) || (Url[SvcUrlSize] == '/')))
		{
			CandidateMatchSize = SvcUrl.size();
			CandidateService   = SvcEntry.Service;
		}
	}

	return CandidateService;
}

}  // namespace zen::asio_http

//////////////////////////////////////////////////////////////////////////

namespace zen {

class HttpAsioServer : public HttpServer
{
public:
	HttpAsioServer(bool ForceLoopback, unsigned int ThreadCount);
	~HttpAsioServer();

	virtual void RegisterService(HttpService& Service) override;
	virtual int	 Initialize(int BasePort, std::filesystem::path DataDir) override;
	virtual void Run(bool IsInteractiveSession) override;
	virtual void RequestExit() override;
	virtual void Close() override;

private:
	Event		 m_ShutdownEvent;
	int			 m_BasePort		 = 0;
	bool		 m_ForceLoopback = false;
	unsigned int m_ThreadCount	 = 0;

	std::unique_ptr<asio_http::HttpAsioServerImpl> m_Impl;
};

HttpAsioServer::HttpAsioServer(bool ForceLoopback, unsigned int ThreadCount)
: m_ForceLoopback(ForceLoopback)
, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(std::thread::hardware_concurrency(), 8u))
, m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>())
{
	ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser));
}

HttpAsioServer::~HttpAsioServer()
{
	if (m_Impl)
	{
		ZEN_ERROR("~HttpAsioServer() called without calling Close() first");
	}
}

void
HttpAsioServer::Close()
{
	try
	{
		m_Impl->Stop();
	}
	catch (const std::exception& ex)
	{
		ZEN_WARN("Caught exception stopping http asio server: {}", ex.what());
	}
	m_Impl.reset();
}

void
HttpAsioServer::RegisterService(HttpService& Service)
{
	m_Impl->RegisterService(Service.BaseUri(), Service);
}

int
HttpAsioServer::Initialize(int BasePort, std::filesystem::path DataDir)
{
	m_Impl->Initialize(DataDir);

	m_BasePort = m_Impl->Start(gsl::narrow<uint16_t>(BasePort), m_ForceLoopback, m_ThreadCount);

	return m_BasePort;
}

void
HttpAsioServer::Run(bool IsInteractive)
{
	const bool TestMode = !IsInteractive;

	int WaitTimeout = -1;
	if (!TestMode)
	{
		WaitTimeout = 1000;
	}

#if ZEN_PLATFORM_WINDOWS
	if (TestMode == false)
	{
		ZEN_CONSOLE("Zen Server running (asio HTTP). Press ESC or Q to quit");
	}

	do
	{
		if (!TestMode && _kbhit() != 0)
		{
			char c = (char)_getch();

			if (c == 27 || c == 'Q' || c == 'q')
			{
				RequestApplicationExit(0);
			}
		}

		m_ShutdownEvent.Wait(WaitTimeout);
	} while (!IsApplicationExitRequested());
#else
	if (TestMode == false)
	{
		ZEN_CONSOLE("Zen Server running (asio HTTP). Ctrl-C to quit");
	}

	do
	{
		m_ShutdownEvent.Wait(WaitTimeout);
	} while (!IsApplicationExitRequested());
#endif
}

void
HttpAsioServer::RequestExit()
{
	m_ShutdownEvent.Set();
}

Ref<HttpServer>
CreateHttpAsioServer(bool ForceLoopback, unsigned int ThreadCount)
{
	return Ref<HttpServer>{new HttpAsioServer(ForceLoopback, ThreadCount)};
}

}  // namespace zen
